Java NIO - Netty 的一些简单案例及半包问题的演示

EchoServer 案例

服务端的实现

NettyEchoServer:功能极其简单,服务端读取客户端输入的数据,然后将数据直接回显到控制台。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;

/**
* @author KJ
* @description NettyEchoServer
*/
public class NettyEchoServer {
private final int port;
ServerBootstrap b = new ServerBootstrap();

public NettyEchoServer(int port) {
this.port = port;
}

public void runServer() {
// 1. 创建反应器轮询组:bossGroup 负责接收连接,workerGroup 负责处理具体的 I/O 读写
EventLoopGroup bossLoopGroup = new MultiThreadIoEventLoopGroup(1, NioIoHandler.newFactory());
EventLoopGroup workerLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
try {
// 2. 设置反应器轮询组
b.group(bossLoopGroup, workerLoopGroup);

// 3. 设置通道类型:使用 NIO 的服务端 TCP 通道
b.channel(NioServerSocketChannel.class);

// 4. 设置监听端口
b.localAddress(new InetSocketAddress(port));

// 5. 设置通道选项 (例如:允许端口复用,设置 TCP 积压队列大小)
b.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);

// 6. 装配子通道流水线 (Pipeline)
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 向子通道流水线添加共享的 Handler 实例
ch.pipeline().addLast(NettyEchoServerHandler.INSTANCE);
}
});

// 7. 绑定服务器,并异步地阻断直到绑定完成
ChannelFuture f = b.bind().sync();
System.out.println("Echo 服务器启动成功,监听端口:" + port);

// 8. 等待通道关闭的异步任务结束
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 9. 优雅关闭反应器轮询组,释放所有资源
bossLoopGroup.shutdownGracefully();
workerLoopGroup.shutdownGracefully();
}
}


/**
* @ChannelHandler.Sharable 用来标注一个Handler实例可以被多个通道安全地共享。
* 多个通道的流水线可以加入同一个Handler实例。这种共享操作,Netty 默认是不允许的。
* 很多应用场景都需要Handler实例能共享。例如,一个服务器处理十万以上的通道,如果
* 每一个通道都新建很多重复的Handler实例,就会浪费很多宝贵的空间,降低了服务器的性能。
* 所以,如果在Handler实例中没有与特定通道强相关的数据或者状态,建议设计成共享模式。
*
* ChannelHandlerAdapter.isSharable() 可以判断一个Handler是否为可共享。
* 如果其对应的实现加上了@Sharable注解,那么这个方法将返回 true。
*
* NettyEchoServerHandler没有保存与任何通道连接相关的数据,也没有内部的其他数据需要保存。
* 所以,该处理器不仅仅可以用来共享,而且不需要做任何同步控制。这里为它加上了@Sharable注解,
* 表示可以共享。更进一步,这里还设计为了INSTANCE静态实例,所有的通道直接使用这个实例即可。
*
*/
@ChannelHandler.Sharable // 未加该注解,试图将同一个Handler实例添加到多个ChannelPipeline,则会抛异常。
static class NettyEchoServerHandler extends ChannelInboundHandlerAdapter {
public static final NettyEchoServerHandler INSTANCE = new NettyEchoServerHandler();

/**
* 回显服务器处理器的逻辑分为两步:
*
* 第一步,读取从对端输入的数据。channelRead() 方法的 msg 参数的形参类型不是ByteBuf,而是Object,
* 这是由流水线的上一站决定的。一般而言,入站处理的流程是:Netty读取底层的二进制数据,填充到msg时,
* msg是ByteBuf类型,然后经过流水线,传入第一个入站处理器;每一个节点处理完后,将自己的处理结果作
* 为msg参数不断向后传递。因此,msg参数的形参类型只能是Object类型。第一个入站处理器的channelRead
* 方法的msg类型绝对是ByteBuf类型,因为它是Netty读取到的ByteBuf数据包。另外,从Netty 4.1开始,
* ByteBuf 的默认类型是 Direct ByteBuf。注意,Java不能直接访问Direct ByteBuf内部的数据,必须
* 通过调用 getBytes()、readBytes() 等方法将数据读入Java数组中才能继续进行处理。
*
* 第二步,将数据写回客户端。这一步很简单,直接复用前面的msg实例即可。不过要注意,如果上一步调用的是
* readBytes() 方法,那么这一步就不能直接将msg写回了,因为数据已经被readBytes()方法读完了。幸好,
* 上一步调用的读数据方法是 getBytes(),它不影响 ByteBuf 的数据指针,因此可以继续使用。这里除了调用
* ctx.writeAndFlush()方法把msg数据写回客户端之外,也可调用通道的ctx.channel().writeAndFlush()
* 方法发送数据。这两种方法在这里的效果是一样的,因为这个流水线上没有任何出站处理器。
*
* 注:假设你的 Pipeline 顺序是:
* Head ⇄ Encoder_A ⇄ Encoder_B ⇄ Your_Handler ⇄ Tail
* 调用 ctx.writeAndFlush():数据流向是 Your_Handler ➔ Encoder_A ➔ Head,它跳过了 Encoder_B。
* 调用 ctx.channel().writeAndFlush():数据流向是 Tail ➔ Encoder_B ➔ Encoder_A ➔ Head,它经过所有处理器。
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;

// 根据 ByteBuf 的存储属性判断是堆内存还是直接内存
System.out.println("msg type: " + (in.hasArray() ? "堆内存" : "直接内存"));

int len = in.readableBytes();
byte[] arr = new byte[len];
in.getBytes(0, arr);

System.out.println("server received: " + new String(arr, StandardCharsets.UTF_8));
System.out.println("写回前,msg.refCnt:" + in.refCnt());

// 零拷贝机制:直接写回原始数据包,Netty 会在发送完成后自动 release
ChannelFuture f = ctx.writeAndFlush(msg); // 从当前位置开始向 Header(头部)方向传播;而 ctx.channel().writeAndFlush() 从尾部开始向头部传播

f.addListener((ChannelFuture future) -> {
// 回调中再次查看引用计数(通常此时由于发送完毕已减 1)
System.out.println("写回任务状态:" + (future.isSuccess() ? "成功\n" : "失败\n"));
// 注意:在异步完成后访问计数仅用于观察,不应再进行读写操作
});
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 发生异常时关闭连接
cause.printStackTrace();
ctx.close();
}
}

public static void main(String[] args) {
new NettyEchoServer(9000).runServer();
}
}


客户端的实现

NettyEchoClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioIoHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;

/**
* @author KJ
* @description NettyEchoClient
*/
public class NettyEchoClient {
private final int serverPort;
private final String serverIp;
Bootstrap b = new Bootstrap();

public NettyEchoClient(String ip, int port) {
this.serverPort = port;
this.serverIp = ip;
}

/**
* 客户端在成功连接到服务端后不断循环获取控制台的输入,通过与服务端之间的连接通道发送到服务器。
*/
public void runClient() {
// 创建反应器轮询组
EventLoopGroup workerLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
try {
// 1.设置反应器轮询组
b.group(workerLoopGroup);
// 2.设置nio类型的通道
b.channel(NioSocketChannel.class);
// 3.设置监听端口
b.remoteAddress(serverIp, serverPort);
// 4.设置通道的参数
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
// 5.装配子通道流水线
b.handler(new ChannelInitializer<SocketChannel>() {
// 有连接到达时会创建一个通道
protected void initChannel(SocketChannel ch) {
// 管理子通道中的Handler
// 向子通道流水线添加一个Handler
ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE);
}
});

ChannelFuture future = b.connect();
future.addListener((ChannelFuture futureListener) -> {
if (futureListener.isSuccess()) {
System.out.println("EchoClient客户端连接成功!");
} else {
System.out.println("EchoClient客户端连接失败!");
}
});
future.sync(); // 阻塞,直到连接成功

Channel channel = future.channel();
Scanner scanner = new Scanner(System.in);
System.out.println("请输入发送内容:");
while (scanner.hasNext()) {
// 获取输入的内容
String next = scanner.next();
byte[] bytes = next.getBytes(StandardCharsets.UTF_8);
// 发送ByteBuf
ByteBuf buffer = channel.alloc().buffer();
buffer.writeBytes(bytes);
channel.writeAndFlush(buffer);
System.out.println("请输入发送内容:");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅关闭EventLoopGroup,
// 释放掉所有资源,包括创建的线程
workerLoopGroup.shutdownGracefully();
}
}

@ChannelHandler.Sharable
static class NettyEchoClientHandler extends ChannelInboundHandlerAdapter {
public static final NettyEchoClientHandler INSTANCE = new NettyEchoClientHandler();

@Override // 入站处理方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
int len = byteBuf.readableBytes();
byte[] arr = new byte[len];
byteBuf.getBytes(0, arr);
System.out.println("client received: " + new String(arr, StandardCharsets.UTF_8));
// 释放ByteBuf的两种方法
// 方法一:手动释放ByteBuf
byteBuf.release();
// 方法二:调用父类的入站方法,将msg向后传递
// super.channelRead(ctx,msg);
}
}

public static void main(String[] args) {
NettyEchoClient nettyEchoClient = new NettyEchoClient("127.0.0.1", 9000);
nettyEchoClient.runClient();
}
}


半包问题的复现

问题演示

改造一下前面的 NettyEchoClient 实例,通过循环的方式向 NettyEchoServer 回显服务器写入大量的 ByteBuf,然后看看实际的服务器响应结果。注意:服务器类不需要改造,直接使用之前的回显服务器即可。改造好的客户端类——叫 NettyDumpSendClient。在客户端建立连接成功之后,使用一个 for 循环不断通过通道向服务端发送ByteBuf, 一直写到1000次,这些ByteBuf的内容相同,都是相同的字符串内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class NettyDumpSendClient {
private final String serverIp;
private final int serverPort;
Bootstrap b = new Bootstrap();

public NettyDumpSendClient(String ip, int port) {
this.serverPort = port;
this.serverIp = ip;
}

/**
* 客户端在成功连接到服务端后不断循环获取控制台的输入,通过与服务端之间的连接通道发送到服务器。
*/
public void runClient() {
// 创建反应器轮询组
EventLoopGroup workerLoopGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
try {
// 1.设置反应器轮询组
b.group(workerLoopGroup);
// 2.设置nio类型的通道
b.channel(NioSocketChannel.class);
// 3.设置监听端口
b.remoteAddress(serverIp, serverPort);
// 4.设置通道的参数
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
// 5.组装处理流水线
b.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
// 哪怕现在逻辑为空,也必须初始化 Pipeline
// 后续如果你有入站处理逻辑,可以在这里 addLast
ch.pipeline().addLast(NettyEchoClientHandler.INSTANCE);
}
});

ChannelFuture future = b.connect();
future.addListener((ChannelFuture futureListener) -> {
if (futureListener.isSuccess()) {
System.out.println("NettyDumpSendClient客户端连接成功!");
} else {
System.out.println("NettyDumpSendClient客户端连接失败!");
}
});
future.sync(); // 阻塞,直到连接成功

Channel channel = future.channel();
//发送大量的文字
String content = "密涅瓦的猫头鹰在黄昏起飞。";
byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
for (int i = 0; i < 1000; i++) {
// 发送ByteBuf
ByteBuf buffer = channel.alloc().buffer();
buffer.writeBytes(bytes);
channel.writeAndFlush(buffer);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 优雅关闭EventLoopGroup,
// 释放掉所有资源,包括创建的线程
workerLoopGroup.shutdownGracefully();
}
}

@ChannelHandler.Sharable
static class NettyEchoClientHandler extends ChannelInboundHandlerAdapter {
public static final NettyEchoClient.NettyEchoClientHandler INSTANCE = new NettyEchoClient.NettyEchoClientHandler();

@Override // 入站处理方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
int len = byteBuf.readableBytes();
byte[] arr = new byte[len];
byteBuf.getBytes(0, arr);
System.out.println("client received: " + new String(arr, StandardCharsets.UTF_8));
byteBuf.release();
}
}

public static void main(String[] args) {
NettyDumpSendClient client = new NettyDumpSendClient("127.0.0.1", 9000);
client.runClient();
}
}

仔细观察服务端的控制台输出,可以看出存在三种类型的输出:

  • 读到一个完整的客户端输入ByteBuf。
  • 读到多个客户端的ByteBuf输入,但是“粘”在了一起。
  • 读到部分ByteBuf的内容,并且有乱码。

除了观察服务端的输出之外,再仔细观察客户端的输出,可以看 到客户端也存在以上三种类型的输出。对应于第1种情况接收到的完整的ByteBuf,这里称为“全包”。 对应于第2种情况,多个发送端的输入ByteBuf“粘”在了一起,这里 称为“粘包”。对应于第3种情况,一个发送过来的ByteBuf被“拆 开”接收,接收端读取到一个破碎的包,这里称为“半包”。为了简单起见,也可以将“粘包”的情况看成特殊的“半包”。 “粘包”和“半包”可以统称为传输的“半包问题”。


半包问题的本质

半包问题包含了 “粘包” 和 “半包” 两种情况:

  • 粘包:接收端(Receiver)收到一个ByteBuf,包含了发送端(Sender)的多个ByteBuf,发送端的多个ByteBuf在接收端“粘” 在了一起。
  • 半包:Receiver将Sender的一个ByteBuf“拆”开了收,收 到多个破碎的包。换句话说,Receiver收到了Sender的一个ByteBuf的 一小部分。

无论是粘包还是半包都不是一次正常的ByteBuf缓存区接收,具体如图所示:

粘包和半包的来源得从操作系统底层说起。我们知道,底层网络是以二进制字节报文的形式来传输数据的。读数据的过程大致为:当IO可读时,Netty 会从底层网络将二进制数据读到ByteBuf缓冲区中,再交给Netty程序转成Java POJO对象。写数据的过程大致为:编码器将一个Java类型的数据转换成底层能够传输的二进制ByteBuf缓冲数据。

在发送端 Netty 的应用层进程缓冲区中,程序以 ByteBuf 为单位来发送数据,但是到了底层操作系统内核缓冲区,底层会按照协议的规范对数据包进行二次封装,封装成传输层的协议报文,再进行发送。 在接收端收到传输层的二进制包后,首先复制到内核缓冲区,Netty读取ByteBuf时才复制到应用的用户缓冲区。在接收端,当Netty程序将数据从内核缓冲区复制到用户缓冲区的 ByteBuf时,问题来了:

  • 每次读取底层缓冲的数据容量是有限制的,当TCP内核缓冲区的数据包比较大时,可能会将一个底层包分成多次ByteBuf进行复制,进而造成用户缓冲区读到的是半包。
  • 当TCP内核缓冲区的数据包比较小时,一次复制的是不止一个内核缓冲区包,进而会造成用户缓冲区读到粘包。

如何解决呢?基本思路是,在接收端,Netty程序需要根据自定义协议将读取到的进程缓冲区ByteBuf在应用层进行二次组装,重新组装应用层的数据包。接收端的这个过程通常也称为分包或者拆包。在Netty中分包的方法主要有以下两种:

  • 可以自定义解码器分包器:基于 ByteToMessageDecoder 或者 ReplayingDecoder,定义自己的用户缓冲区分包器。
  • 使用 Netty 内置的解码器。例如,可以使用 Netty 内置的 LengthFieldBasedFrameDecoder 自定义长度数据包解码器对用户缓冲区 ByteBuf 进行正确的分包。